// Copyright 2022 Huawei Cloud Computing Technology Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <unistd.h>
#include <pthread.h>
#include "CasLog.h"
#include "CasHeartbeatThread.h"
#include "CasCmdController.h"
#include "../CasCommon.h"

using namespace std;

const uint64_t MAX_LAGS_NUM = 5;
const uint64_t MAX_LAG = 10 * 1000000;

CasHeartbeatThread::CasHeartbeatThread(CasHeartbeatController *heartbeatController, CasSocket *sock)
{
    m_threadStatus = CAS_THREAD_INIT;
    m_casSocket = sock;
    m_heartbeatController = heartbeatController;
}

CasHeartbeatThread::~CasHeartbeatThread()
{
    if (m_statLagTask.joinable()) {
        SetThreadStatus(CAS_THREAD_INIT);
        m_statLagTask.join();
    }
}

uint64_t CasHeartbeatThread::GetLag()
{
    return m_lag;
}

uint64_t CasHeartbeatThread::TestLag()
{
    if (this->m_threadStatus == CAS_THREAD_EXIT) {
        return 0;
    }
    struct timeval sent, recv;
    const int sleepTime = 20000;
    gettimeofday(&sent, nullptr);
    bool ret = false;
    if (m_heartbeatController != nullptr) {
        ret = m_heartbeatController->HeartBeatRequest(600);
    }

    usleep(sleepTime);
    gettimeofday(&recv, nullptr);
    if (!ret) {
        m_sendHbErrCount++;
        if (m_sendHbErrCount >= 5 && m_casSocket != nullptr) {
            INFO("Send or recv heartbeat failed.");
            m_casSocket->SetStatus(SOCKET_STATUS_DISCONNECT);
        }
        return MAX_LAG;
    }
    m_sendHbErrCount = 0;
    uint64_t lag = (uint64_t)recv.tv_usec + (uint64_t)recv.tv_sec * 1000000 -
        ((uint64_t)sent.tv_usec + (uint64_t)sent.tv_sec * 1000000) - sleepTime;
    return lag;
}

void CasHeartbeatThread::UpdateLag(uint64_t lag)
{
    if (this->m_threadStatus == CAS_THREAD_EXIT) {
        return;
    }
    if (m_lagDeque.size() >= MAX_LAGS_NUM) {
        m_lagDeque.pop_front();
    }
    m_lagDeque.push_back(lag);

    uint64_t maxLag = 0;
    for (uint64_t i = 0; i < m_lagDeque.size(); ++i) {
        if (maxLag < m_lagDeque.at(i)) {
            maxLag = m_lagDeque.at(i);
        }
    }
    this->m_lag = maxLag;
}

void TaskEntry(CasHeartbeatThread *threadObj)
{
    while (threadObj->GetThreadStatus() == CAS_THREAD_RUNNING || threadObj->GetThreadStatus() == CAS_THREAD_STOP) {
        if (threadObj->GetThreadStatus() == CAS_THREAD_STOP) {
            usleep(100 * 1000);
            continue;
        }

        uint64_t thisLag = threadObj->TestLag();
        if (thisLag != 0) {
            threadObj->UpdateLag(thisLag);
        }
        usleep(1000);
    }
    threadObj->SetThreadStatus(CAS_THREAD_INIT);
    INFO("Stat lag task end.");
}

int CasHeartbeatThread::GetThreadStatus()
{
    return this->m_threadStatus;
}

void CasHeartbeatThread::SetThreadStatus(int status)
{
    this->m_threadStatus = status;
}

int CasHeartbeatThread::Start()
{
    if (this->m_threadStatus == CAS_THREAD_RUNNING) {
        return -1;
    }

    this->m_threadStatus = CAS_THREAD_INIT;
    this->m_statLagTask = std::thread(std::bind(&TaskEntry, this));
    this->m_threadStatus = CAS_THREAD_RUNNING;
    return 0;
}

int CasHeartbeatThread::Restart()
{
    if (this->m_threadStatus == CAS_THREAD_STOP) {
        this->m_threadStatus = CAS_THREAD_RUNNING;
        return 0;
    }
    return -1;
}

int CasHeartbeatThread::Stop()
{
    if (this->m_threadStatus == CAS_THREAD_RUNNING) {
        this->m_threadStatus = CAS_THREAD_STOP;
        return 0;
    }
    return -1;
}

int CasHeartbeatThread::Exit()
{
    if (this->m_threadStatus == CAS_THREAD_RUNNING || this->m_threadStatus == CAS_THREAD_STOP) {
        if (this->m_threadStatus != CAS_THREAD_EXIT) {
            this->m_threadStatus = CAS_THREAD_EXIT;
        }
        if(m_statLagTask.joinable()) {
            m_statLagTask.join();
            return 0;
        }
    }
    return -1;
}